${\bf \large \cal Hovhannes \ Grigoryan}\\ {\rm \small New \ York, \ NY}$
We discuss basic implementation of Hidden Markov Model (HMM), and various applications to financial time series data.
%%html
<style>
body, p, div.rendered_html {
color: black;
font-family: "Times Roman", sans-serif;
font-size: 12pt;
}
</style>
import numpy as np
import datetime
from datetime import datetime, timedelta
import json
import quandl
from hmmlearn import hmm
from hmmlearn.hmm import GaussianHMM
from sklearn.preprocessing import StandardScaler
import seaborn as sns
from matplotlib import cm, pyplot as plt
from matplotlib.dates import YearLocator, MonthLocator
import seaborn as sns
from utils.make_media import *
from utils.html_converter import html_converter
from IPython.display import HTML, Image, clear_output
import warnings
warnings.filterwarnings('ignore')
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
plt.style.use('fivethirtyeight')
plt.rcParams['figure.figsize'] = (14,8)
np.random.seed(42)
with open('config.json') as fh:
auth_tok = json.load(fh)["quandl_api_key"]
quandl.ApiConfig.api_key = auth_tok
import pandas as pd
pd.core.common.is_list_like = pd.api.types.is_list_like
from pandas_datareader import data as pdr
import yfinance as yf
yf.pdr_override()
def get_yahoo_data(stocks, start_date, end_date, column="Adj Close"):
try:
data = pdr.get_data_yahoo(stocks, start_date, end_date)[column]
except Exception as e:
data = None
print("Problem reading stock data: {}".format(e))
return data
def add_lags(df, col, num_lags):
"""
Inputs:
df - pandas dataframe that contains column "col"
col - string (column name to be lagged)
num_lags - number of lags or shifts
Returns:
dataframe appended with lagged values of column "col"
"""
df_ = df.copy()
if num_lags > 0:
for i in range(1, num_lags+1):
col_name = col+'_lag'+str(i)
df_[col_name] = df[col].shift(i)
return df_
def HMM_forward(pi, a, b, obs):
""" Forward Algorithm for the HMM.
parameters:
pi: start probability vector (probability to be in a given hidden state at t=0)
a[i,j]: transition probability matrix (from hidden state i to j)
b[j,k]: emission probability matrix (from hidden state j to observable state k)
obs[t]: observable state at time t, is a natural number, not equal to the value of observable state
returns:
alpha[j, t]: probability to be in a hidden state j at time t with observable value obs[t]
c[t]: sum of alpha[j,t] over all hidden states j.
"""
nStates = np.shape(a)[0] # number of hidden states inferred from the size of the transition matrix a
T = np.shape(obs)[0] # number of time steps in the observation sequence
alpha = np.zeros((nStates, T))
alpha[:, 0] = pi
for t in range(1, T):
for j in range(nStates):
alpha[j, t] = b[j, obs[t]] * np.sum( alpha[:, t-1] * a[:, j] )
c = np.ones((T))
for t in range(T):
c[t] = np.sum(alpha[:, t])
return alpha, c
def HMM_backward(a, b, obs, c):
nStates = np.shape(b)[0]
T = np.shape(obs)[0]
beta = np.zeros((nStates, T))
beta[:, T-1] = 1.0 # this is 1 only if hidden state at time T-1 is termination state
for t in range(T-2, -1, -1):
for s in range(nStates):
beta[s, t] = np.sum(b[:, obs[t+1]] * beta[:, t+1] * a[s, :])
for t in range(T):
beta[:, t] /= c[t]
return beta
def HMM_Viterbi(pi, a, b, obs):
""" Viterbi Algorithm for HMM. This is a Decoding Problem:
Given the set of observable states find the most probable sequence of hidden states.
parameters:
pi: start probability vector (probability to be in a given hidden state at t=0)
a[i,j]: transition probability matrix (from hidden state i to j)
b[j,k]: emission probability matrix (from hidden state j to observable state k)
obs[t]: observable state at time t, is a natural number, not equal to the value of observable state
returns:
path[t]: sequence of hidden states (most probable state at time t)
delta[j, t]: largest of all probabilities to be in a hidden state j at time t with observable value obs[t]
phi[j, t]: the hidden state that has the largest probability to be in at time t.
"""
alpha, c = HMM_forward(pi, a, b, obs)
T = np.shape(obs)[0]
path = np.zeros(T)
for t in range(T):
max_state = np.argmax(alpha[:,t])
path[t] = max_state
return path
def HMM_BaumWelch(obs, nStates):
T = np.shape(obs)[0]
xi = np.zeros((nStates, nStates, T))
# Initialise pi, a, b randomly
#pi = 1./nStates*np.ones((nStates))
pi = np.random.rand(nStates)
pi /= np.sum(pi)
a = np.random.rand(nStates,nStates)
b = np.random.rand(nStates,np.max(obs)+1)
tol = 1e-6
error = tol+1
maxits = 1000
nits = 0
while ((error > tol) & (nits < maxits)):
nits += 1
oldpi = pi.copy()
olda = a.copy()
oldb = b.copy()
# E step
alpha, c = HMM_forward(pi, a, b, obs)
beta = HMM_backward(a, b, obs, c)
for t in range(T-1):
for i in range(nStates):
for j in range(nStates):
xi[i,j,t] = alpha[i,t] * a[i,j] * b[j,obs[t+1]] * beta[j,t+1]
xi[:,:,t] /= np.sum(xi[:,:,t])
# The last step has no b, beta in
for i in range(nStates):
for j in range(nStates):
xi[i,j,T-1] = alpha[i,T-1] * a[i,j]
xi[:,:,T-1] /= np.sum(xi[:,:,T-1])
# M step
for i in range(nStates):
pi[i] = np.sum(xi[i,:,0])
for j in range(nStates):
a[i,j] = np.sum(xi[i,j,:T-1])/np.sum(xi[i,:,:T-1])
for k in range(max(obs)):
found = (obs==k).nonzero()
b[i,k] = np.sum(xi[i,:,found])/np.sum(xi[i,:,:])
error = (np.abs(a-olda)).max() + (np.abs(b-oldb)).max()
#print(nits, error, 1./np.sum(1./c), np.sum(alpha[:,T-1]))
return pi, a, b
def Viterbi(pi, a, b, obs):
""" Viterbi Algorithm for HMM. This is a Decoding Problem:
Given the set of observable states find the most probable sequence of hidden states.
parameters:
pi: start probability vector (probability to be in a given hidden state at t=0)
a[i,j]: transition probability matrix (from hidden state i to j)
b[j,k]: emission probability matrix (from hidden state j to observable state k)
obs[t]: observable state at time t, is a natural number, not equal to the value of observable state
returns:
path[t]: sequence of hidden states (most probable state at time t)
delta[j, t]: largest of all probabilities to be in a hidden state j at time t with observable value obs[t]
phi[j, t]: the hidden state that has the largest probability to be in at time t.
"""
nStates = np.shape(a)[0]
T = np.shape(obs)[0]
path = np.zeros(T)
delta = np.zeros((nStates, T))
phi = np.zeros((nStates, T))
delta[:, 0] = pi * b[:, obs[0]]
phi[:, 0] = 0
for t in range(1, T):
for s in range(nStates):
delta[s, t] = np.max(delta[:, t-1] * a[:, s]) * b[s, obs[t]]
phi[s,t] = np.argmax(delta[:, t-1]*a[:, s])
path[T-1] = np.argmax(delta[:, T-1])
for t in range(T-2, -1, -1):
path[t] = phi[int(path[t+1]), int(t+1)]
return path, delta, phi
pi = [0,1,0,0]
a = np.matrix([[1.0,0.0,0.0,0.0],
[0.2,0.3,0.1,0.4],
[0.2,0.5,0.2,0.1],
[0.7,0.1,0.1,0.1]])
b = np.matrix([[1.0,0.0,0.0,0.0,0.0],
[0.0,0.3,0.4,0.1,0.2],
[0.0,0.1,0.1,0.7,0.1],
[0.0,0.5,0.2,0.1,0.2]])
#a = a[1:,1:]
#b = b[1:,1:]
#pi = pi[1:]
obs = np.array([1, 1, 3, 2, 0])
#a.shape, b.shape, obs.shape
alpha, c = HMM_forward(pi, a, b, obs)
path1 = Viterbi(pi, a, b, obs)[0]
path2 = HMM_Viterbi(pi, a, b, obs)
path1, path2
obs = np.array([1, 1, 3, 2, 0, 1, 2, 3, 2, 1, 1, 2, 3])
pi, a, b = HMM_BaumWelch(obs, 4)
path1 = Viterbi(pi, a, b, obs)[0]
path2 = HMM_Viterbi(pi, a, b, obs)
path1, path2
N = 10
mu1, sigma1 = 0.5, 1.0
mu2, sigma2 = -0.5, 3.0
mu3, sigma3 = 0, 2.0
r1 = np.random.normal(mu1, sigma1, N)
r2 = np.random.normal(mu2, sigma2, N)
r3 = np.random.normal(mu3, sigma3, N)
r4 = np.random.normal(mu1, sigma1, N)
r = np.concatenate([r1,r2,r3,r4])
#r = np.cumsum(r)
plt.plot(r);
from sklearn.preprocessing import OneHotEncoder
enc = OneHotEncoder()
enc.fit(np.atleast_2d(10+r))
enc.n_values_, enc.feature_indices_, enc.transform(np.atleast_2d(10+r)).toarray()
dir_r = enc.n_values_
dir_r -= min(dir_r)
obs = dir_r.astype("int")
pi, a, b = HMM_BaumWelch(obs, max(obs))
path1 = Viterbi(pi, a, b, obs)[0]
path2 = HMM_Viterbi(pi, a, b, obs)
path1, path2
plt.plot(-dir_r, 'k-')
plt.plot(path2, 'r-')
plt.plot(path1, 'b-');
path2, max(path2)
last = int(path2[-3])
#a[last, :]*b[:, i]
maxstate = -10
index = -1
for i in range(max(path2.astype("int"))+1):
state = np.sum(a[last, :]*b[:, i])
print(i, state)
if state > maxstate:
maxstate = state
index = i
maxstate, index
import datetime
import pickle
with open("..\\..\\qsforex\\data\\forex_data_dic.csv", 'rb') as handle:
data = pickle.load(handle)
pairs = ["AUDUSD", "EURUSD", "GBPUSD", "NZDUSD","USDCAD", "USDCHF", "USDJPY"]
bid = data["BID"][pairs[0]][["close"]]
ask = data["ASK"][pairs[0]][["close"]]
bid.columns = [pairs[0]+"_bid"]
ask.columns = [pairs[0]+"_ask"]
df = bid.merge(ask, left_index=True, right_index=True, how="outer")
for pair in pairs[1:]:
dfb = data["BID"][pair][["close"]]
dfb.columns = [pair+"_bid"]
dfa = data["ASK"][pair][["close"]]
dfa.columns = [pair+"_ask"]
df = df.merge(dfb, left_index=True, right_index=True, how="outer")
df = df.merge(dfa, left_index=True, right_index=True, how="outer")
dcolumns = df.columns.values
dindex = df.index
print(df.shape, df.columns)
df.head()
# Unpack quotes
dates = df.index.values
# Pack diff and volume for training.
X_train = 100000*df.iloc[:200,4:5].pct_change().dropna().values.ravel()
X_test = 100000*df.iloc[200:, 4:5].pct_change().dropna().values.ravel()
#x_train += np.abs(min(x_train))+1
#x_test += np.abs(min(x_test))+1
x_train = np.sign(X_train) + 1
x_test = np.sign(X_test) + 1
x_train = x_train.astype("int")
x_test = x_test.astype("int")
plt.plot(x_train);
x_train[-20:], x_test[10:30]
obs = x_train
pi, a, b = HMM_BaumWelch(obs, 3)
path1 = Viterbi(pi, a, b, obs)[0]
path2 = HMM_Viterbi(pi, a, b, obs)
path1, path2
plt.plot(x_train, 'r')
#plt.plot(path1)
plt.plot(path2, 'b');
def forecast_next(path, a, b):
last = int(path[-1])
maxstate = -100
index = -1
for i in range(3):
state = np.sum(a[last, :]*b[:, i])
#print(i, state)
if state > maxstate:
maxstate = state
index = i
return maxstate, index
for i in range(10):
p = path2[:-10+i]
f = forecast_next(p, a, b)
print(f, path2[-10+i])
def forecast_next_hmm(obs):
pi, a, b = HMM_BaumWelch(obs, 3)
path = HMM_Viterbi(pi, a, b, obs)
last = int(path[-1])
maxstate = -100
index = -1
for i in range(3):
state = np.sum(a[last, :]*b[:, i])
if state > maxstate:
maxstate = state
index = i
return maxstate, index
for i in range(20):
data = 100000*df.iloc[:,4:5].pct_change().dropna().values.ravel()
X_train = data[i:50+i]
X_next = data[50+i]
x_train = np.sign(X_train) + 1
x_next = np.sign(X_next) + 1
x_train = x_train.astype("int")
x_next = x_next.astype("int")
obs = x_train
pi, a, b = HMM_BaumWelch(obs, 3)
path = HMM_Viterbi(pi, a, b, obs)
last = int(path[-1])
maxstate = -100
index = -1
for i in range(3):
state = np.sum(a[last, :]*b[:, i])
if state > maxstate:
maxstate = state
index = i
print(maxstate, index-1, x_next-1)
data = 1000000*df.iloc[:1000,4:5].pct_change().dropna().values.ravel()
dmax = int(max(data))+1
dmin = int(min(data))-1
rlab = dmax-dmin
result = pd.cut(data, bins=range(dmin, dmax+1), labels=range(0, rlab))
cat_data = result.astype("int")
plt.plot(cat_data)
result
window = 100
true_x = []
pred_x = []
for i in range(10):
data = cat_data
x_train = data[i:window+i]
x_next = data[window+i]
#x_train = np.sign(X_train) + 1
#x_next = np.sign(X_next) + 1
x_train = x_train.astype("int")
x_next = x_next.astype("int")
obs = x_train
pi, a, b = HMM_BaumWelch(obs, 3)
path = HMM_Viterbi(pi, a, b, obs)
last = int(path[-1])
maxstate = -100
index = -1
for i in range(3):
state = np.sum(a[last, :]*b[:, i])
if state > maxstate:
maxstate = state
index = i
true_x.append(x_next)
pred_x.append(index)
print(maxstate, index, x_next)
plt.plot(true_x, 'r-')
plt.plot(pred_x, 'b-')
window = 500
true_x = []
pred_x = []
for i in range(20):
data = cat_data
x_train = data[i:window+i]
x_next = data[window+i]
#x_train = np.sign(X_train) + 1
#x_next = np.sign(X_next) + 1
x_train = x_train.astype("int")
x_next = x_next.astype("int")
obs = x_train.reshape(-1, 1)
model = GaussianHMM(n_components=227, covariance_type="diag", n_iter=1000).fit(obs)
x_pred, _ = model.sample(1)
x_pred = x_pred.ravel()[0]
true_x.append(x_next)
pred_x.append(x_pred)
print(x_next, x_pred)
plt.plot(true_x, 'bo')
plt.plot(pred_x, 'r-')
We show how to sample points from a Hidden Markov Model (HMM): we use a 4-components with specified mean and covariance. The plot show the sequence of observations generated with the transitions between them. We can see that, as specified by our transition matrix, there are no transition between component 1 and 3.
# Prepare parameters for a 4-components HMM Initial population probability
startprob = np.array([0.6, 0.3, 0.1, 0.0])
# The transition matrix, note that there are no transitions possible between component 1 and 3
transmat = np.array([[0.7, 0.2, 0.0, 0.1],
[0.3, 0.5, 0.2, 0.0],
[0.0, 0.3, 0.5, 0.2],
[0.2, 0.0, 0.2, 0.6]])
# The means of each component
means = np.array([[0.0, 0.0],
[0.0, 11.0],
[9.0, 10.0],
[11.0, -1.0]])
# The covariance of each component
covars = .5 * np.tile(np.identity(2), (4, 1, 1))
# Build an HMM instance and set parameters
model = hmm.GaussianHMM(n_components=4, covariance_type="full")
# Instead of fitting it from the data, we directly set the estimated parameters, means and covariance of components
model.startprob_ = startprob
model.transmat_ = transmat
model.means_ = means
model.covars_ = covars
# Generate samples
X, Z = model.sample(500)
# Plot the sampled data
plt.plot(X[:, 0], X[:, 1], ".--", label="observations", ms=6,
mfc="orange", alpha=0.7)
# Indicate the component numbers
for i, m in enumerate(means):
plt.text(m[0], m[1], 'Component %i' % (i + 1),
size=17, horizontalalignment='center',
bbox=dict(alpha=.7, facecolor='w'))
plt.legend(loc='best');
plt.plot(X[:,0])
plt.plot(X[:,1]);
start_date ='2009-10-30'
end_date = '2019-10-30'
lags = 0
columns = ["Adj Close", "Volume"]
market = 'SPY'
f1 = '^TYX' # Treasury Yield 30 Years
f2 = '^TNX' # Treasury Yield 10 Years
f3 = '^VIX' # constant maturity 10yr - 3m
symbol = "^GSPC" # symbol for which we want to get volatolity forecast
stocks = [symbol, market, f1, f2, f3]
# get data from yahoo finance (today is 10/31/2019)
df = get_yahoo_data(stocks, start_date, end_date, column=columns)
df.head()
data = df["Adj Close"].copy()
volume = df["Volume"]
data["ret"] = data[symbol].pct_change()
data["abs_ret"] = np.abs(100*data[symbol].pct_change())
data[symbol+"_vol"] = volume[symbol]/volume[market]
data = add_lags(data, "ret", lags)
data = data.ix[lags+1:,:]
n = data.shape[0]
n_tr = int(0.85*n)
data_train = data.ix[:n_tr,:]
data_test = data.ix[n_tr:,:]
price_train = data_train[symbol]#.values.reshape(-1,1)
price_test = data_test[symbol]#.values.reshape(-1,1)
# columns to drop
drop_cols = [symbol, market, f1, f2, f3, "abs_ret", symbol+"_vol"]
data_train.drop(drop_cols, axis=1, inplace=True)
data_test.drop(drop_cols, axis=1, inplace=True)
print(data_train.shape, data_test.shape)
data_train.head()
plt.rcParams['figure.figsize'] = (14,8)
data["ret"].plot();
# Train Set
X_train = StandardScaler().fit_transform(data_train.fillna(0))
ret_train = data_train["ret"] #.values.reshape(-1,1)
dates_train = data_train.index
# Test Set
X_test = StandardScaler().fit_transform(data_test.fillna(0))
ret_test = data_test["ret"] #.values.reshape(-1,1)
dates_test = data_test.index
# Training the Model
num_hidden_states = 2
hmm_model = GaussianHMM(n_components=num_hidden_states, covariance_type="full", n_iter=100).fit(X_train)
#hmm_model = hmm.GMMHMM(n_components=2, n_mix=100, covariance_type='diag').fit(X)
#hmm_model = mix.GaussianMixture(n_components=2, covariance_type="full", n_init=100, random_state=7).fit(X)
# Train Prediction of Hidden State Sequence
hidden_states = hmm_model.predict(X_train)
# Test Prediction of Hidden State Sequence
hidden_states_test = hmm_model.predict(X_test)
print("Train Score", hmm_model.decode(X_train)[0])
print("Test Score", hmm_model.decode(X_test)[0])
plt.plot(hmm_model.predict_proba(X_train));
print("Transition matrix", hmm_model.transmat_prior, hmm_model.transmat_)
print("Means:", hmm_model.means_prior, hmm_model.means_.ravel())
print("Covariances:", hmm_model.covars_prior, hmm_model.covars_.ravel())
plt.plot(dates_train, hidden_states/30.0, alpha=0.7)
plt.plot(dates_train, ret_train, alpha=0.5)
plt.plot(dates_train, hmm_model.predict_proba(X_train)[:,1]/30.0, alpha=0.7)
#plt.plot(dates_train, hmm_model.predict_proba(X_train)[:,0]/40.0)
plt.plot(dates_test, hidden_states_test/30.0)
plt.plot(dates_test, ret_test, alpha=0.5)
#plt.plot(dates_test, hmm_model.predict_proba(X_test)[:,0]/30.0)
plt.plot(dates_test, hmm_model.predict_proba(X_test)[:,1]/30.0);
plt.plot(dates_train, (1+hidden_states)*np.mean(price_train)/2)
plt.plot(dates_train, price_train);
plt.plot(dates_test, (1+hidden_states_test)*np.mean(price_test)/2)
plt.plot(dates_test, price_test);
ax = plt.subplot(1, 1, 1)
ax.plot(dates_train, 4*hidden_states*np.mean(ret_train**2))
ax.plot(dates_train, ret_train**2, linewidth=0.3)
ax.set_ylim(0, max(ret_train**2)/4);
ax = plt.subplot(1, 1, 1)
ax.plot(dates_test, hidden_states_test/40.0)
ax.plot(dates_test, ret_test);
fig, axs = plt.subplots(hmm_model.n_components, sharex=True, sharey=True)
colours = cm.rainbow(np.linspace(0, 1, hmm_model.n_components))
for i, (ax, colour) in enumerate(zip(axs, colours)):
mask = hidden_states == i
ax.plot_date(dates_train[mask], price_train[mask], ".-", c=colour, linewidth=0.5)
ax.set_title("{0}th hidden state".format(i))
ax.xaxis.set_major_locator(YearLocator())
ax.xaxis.set_minor_locator(MonthLocator())
ax.grid(True)
fig, axs = plt.subplots(hmm_model.n_components, sharex=True, sharey=True)
colours = cm.rainbow(np.linspace(0, 1, hmm_model.n_components))
for i, (ax, colour) in enumerate(zip(axs, colours)):
mask = hidden_states_test == i
ax.plot_date(dates_test[mask], price_test[mask], ".-", c=colour, linewidth=0.5)
ax.set_title("{0}th hidden state".format(i))
ax.xaxis.set_major_locator(YearLocator())
ax.xaxis.set_minor_locator(MonthLocator())
ax.grid(True)
ax = plt.subplot(1, 1, 1)
for i in range(hmm_model.n_components):
mask = hidden_states == i
ax.hist(ret_train[mask], bins=100, color=colours[i], alpha=0.4, label=str(i)+"th state")
ax.legend(loc="best");
#ax.set_xlim(0, 10)
#ax.set_ylim(0, 100)
ax = plt.subplot(1, 1, 1)
for i in range(hmm_model.n_components):
mask = hidden_states_test == i
ax.hist(ret_test[mask], bins=40, color=colours[i], alpha=0.5, label=str(i)+"th state")
ax.legend(loc="best");
#ax.set_xlim(0, 10)
#ax.set_ylim(0, 20)
# Training the Model
num_hidden_states = 3
hmm_model = GaussianHMM(n_components=num_hidden_states, covariance_type="full", n_iter=100).fit(X_train)
# Train Prediction of Hidden State Sequence
hidden_states = hmm_model.predict(X_train)
# Test Prediction of Hidden State Sequence
hidden_states_test = hmm_model.predict(X_test)
print("Train Score", hmm_model.decode(X_train)[0])
print("Test Score", hmm_model.decode(X_test)[0])
# position=-1 short, position=0 no stocks, position=+1 long
# buy 100 shares if position<1 and hidden_state=1
# sell 100 shares if position>-1 and hidden state=2
# hold otherwise
bull = 1
bear = 0
positions = np.empty(ret_train.shape[0])
positions[:] = np.NAN
shorts = hidden_states == bear
longs = hidden_states == bull
positions[shorts] = -1
positions[longs] = 1
positions[0] = 0
positions = pd.Series(positions).fillna(method="ffill").values
daily_ret = positions[1:].ravel()*ret_train[:-1].ravel()
cum_ret = (1+daily_ret).cumprod()-1
cum_ret_hold = (1+ret_train).cumprod()-1
R = np.sqrt(252)*np.mean(daily_ret)/np.std(daily_ret)
R_hold = np.sqrt(252)*np.mean(ret_train)/np.std(ret_train)
APR = np.sign(1+cum_ret[-1])*np.abs(1+cum_ret[-1])**(252.0/daily_ret.shape[0]) - 1
APR_hold = np.sign(1+cum_ret_hold[-1])*np.abs(1+cum_ret_hold[-1])**(252.0/ret_train.shape[0]) - 1
print("Sharpe Ratio HMM:", R, "APR:", APR)
print("Sharpe Ratio Hold:", R_hold, "APR Hold", APR_hold)
plt.plot(positions)
plt.plot(hidden_states);
plt.plot(cum_ret, label="HMM strategy")
plt.plot(cum_ret_hold.values[1:], 'r', label="Hold strategy")
plt.title("Cummulative Return on a Train Set")
plt.legend(loc="best");
positions = np.empty(ret_test.shape[0])
positions[:] = np.NAN
shorts = hidden_states_test == bear
longs = hidden_states_test == bull
positions[shorts] = -1
positions[longs] = 1
positions[0] = 0
positions = pd.Series(positions).fillna(method="ffill").values
daily_ret = positions[1:].ravel()*ret_test[:-1].ravel()
cum_ret = (1+daily_ret).cumprod()-1
cum_ret_hold = (1+ret_test).cumprod()-1
R = np.sqrt(252)*np.mean(daily_ret)/np.std(daily_ret)
R_hold = np.sqrt(252)*np.mean(ret_test)/np.std(ret_test)
APR = np.sign(1+cum_ret[-1])*np.abs(1+cum_ret[-1])**(252.0/daily_ret.shape[0]) - 1
APR_hold = np.sign(1+cum_ret_hold[-1])*np.abs(1+cum_ret_hold[-1])**(252.0/ret_test.shape[0]) - 1
print("Sharpe Ratio HMM:", R, "APR:", APR)
print("Sharpe Ratio Hold:", R_hold, "APR Hold", APR_hold)
plt.plot(cum_ret, label="HMM strategy")
plt.plot(cum_ret_hold.values[1:], 'r', label="Hold strategy")
plt.title("Cummulative Return on a Test Set")
plt.legend(loc="best");
Get quotes from Quandl
quotes = quandl.get("WIKI/INTC", trim_start = "1995-10-30", trim_end = "2019-10-30")
X = quotes[['Adj. Close']].diff()
X["volume"] = quotes['Adj. Volume']
X["diff"] = X['Adj. Close']
X.drop(columns="Adj. Close", inplace=True)
X.dropna(inplace=True)
X.head()
n = X.shape[0]
n_tr = int(0.9*n)
x_train = X.ix[:n_tr,:]
x_test = X.ix[n_tr:,:]
X_train = x_train["diff"].values.reshape(-1,1)
X_test = x_test["diff"].values.reshape(-1,1)
X_all = X["diff"].values.reshape(-1,1)
dates_train = x_train.index
dates_test = x_test.index
dates = X.index
ssc = StandardScaler().fit(X_train)
X_train = ssc.transform(X_train)
X_test = ssc.transform(X_test)
X_all = ssc.transform(X_all)
print("fitting to HMM and decoding ...", end="")
# Make an HMM instance and execute fit
# model = GaussianHMM(n_components=4, covariance_type="diag", n_iter=1000).fit(X_train)
model = hmm.GMMHMM(n_components=3, n_mix=5, covariance_type='diag').fit(X_train)
# Predict the optimal sequence of internal hidden state
hidden_states = model.predict(X_train)
print("done")
Print trained parameters and plot
print("Transition matrix")
print(model.transmat_)
print("\nMeans and vars of each hidden state")
for i in range(model.n_components):
print("{0}th hidden state".format(i))
print("mean = ", model.means_[i])
print("var = ", np.diag(model.covars_[i]))
print()
fig, axs = plt.subplots(model.n_components, sharex=True, sharey=True)
colours = cm.rainbow(np.linspace(0, 1, model.n_components))
for i, (ax, colour) in enumerate(zip(axs, colours)):
mask = hidden_states == i
ax.plot_date(dates_train[mask], X_train[mask], "-", c=colour, lw=1.0)
ax.set_title("{0}th hidden state".format(i))
# Format the ticks.
ax.xaxis.set_major_locator(YearLocator())
ax.xaxis.set_minor_locator(MonthLocator())
ax.grid(True)
plt.show()
plt.plot(X_train[:,0].cumsum())
model.decode(X_train)
Xnew, Znew = model.sample(X.shape[0]-X_train.shape[0])
plt.plot(dates, np.vstack((X_train, Xnew))[:,0].cumsum(), label="trained and sampled data")
plt.plot(dates, X_all.cumsum(), label="actual data")
plt.legend(loc="best")
plt.title("Cummulative return for actual and predicted data");
plt.plot(dates_test, Xnew[:,0], label="sampled data")
plt.plot(dates_test, X_test, label="test data")
plt.legend(loc="best")
plt.title("Actual and Sampled Data on the Test Set");
plt.plot(dates_test, Xnew[:,0].cumsum(), label="sampled data")
plt.plot(dates_test, X_test.cumsum(), label="test data")
plt.legend(loc="best")
plt.title("Cummulative Sum of Actual and Sampled Data on the Test Set");
import json
import os
import sys
class MyHmm(object): # base class for different HMM models
def __init__(self, model_name):
# model is (A, B, pi) where A = Transition probs, B = Emission Probs, pi = initial distribution
# a model can be initialized to random parameters using a json file that has a random params model
if model_name == None:
print("Fatal Error: You should provide the model file name")
sys.exit()
self.model = model_name # here, I will input the data directly, json.loads(open(model_name).read())["hmm"]
self.A = self.model["A"]
self.states = list(self.A.keys()) # get the list of states
self.N = len(self.states) # number of states of the model
self.B = self.model["B"]
self.symbols = list(self.B.values())[0].keys() # get the list of symbols, assume that all symbols are listed in the B matrix
self.M = len(self.symbols) # number of states of the model
self.pi = self.model["pi"]
return
def backward(self, obs):
self.bwk = [{} for t in range(len(obs))]
T = len(obs)
# Initialize base cases (t == T)
for y in self.states:
self.bwk[T-1][y] = 1 #self.A[y]["Final"] #self.pi[y] * self.B[y][obs[0]]
for t in reversed(range(T-1)):
for y in self.states:
self.bwk[t][y] = sum((self.bwk[t+1][y1] * self.A[y][y1] * self.B[y1][obs[t+1]]) for y1 in self.states)
prob = sum((self.pi[y]* self.B[y][obs[0]] * self.bwk[0][y]) for y in self.states)
return prob
def forward(self, obs):
self.fwd = [{}]
# Initialize base cases (t == 0)
for y in self.states:
self.fwd[0][y] = self.pi[y] * self.B[y][obs[0]]
# Run Forward algorithm for t > 0
for t in range(1, len(obs)):
self.fwd.append({})
for y in self.states:
self.fwd[t][y] = sum((self.fwd[t-1][y0] * self.A[y0][y] * self.B[y][obs[t]]) for y0 in self.states)
prob = sum((self.fwd[len(obs) - 1][s]) for s in self.states)
return prob
def viterbi(self, obs):
vit = [{}]
path = {}
# Initialize base cases (t == 0)
for y in self.states:
vit[0][y] = self.pi[y] * self.B[y][obs[0]]
path[y] = [y]
# Run Viterbi for t > 0
for t in range(1, len(obs)):
vit.append({})
newpath = {}
for y in self.states:
(prob, state) = max((vit[t-1][y0] * self.A[y0][y] * self.B[y][obs[t]], y0) for y0 in self.states)
vit[t][y] = prob
newpath[y] = path[state] + [y]
# Don't need to remember the old paths
path = newpath
n = 0 # if only one element is observed max is sought in the initialization values
if len(obs)!=1:
n = t
(prob, state) = max((vit[n][y], y) for y in self.states)
return (prob, path[state])
def forward_backward(self, obs): # returns model given the initial model and observations
gamma = [{} for t in range(len(obs))] # this is needed to keep track of finding a state i at a time t for all i and all t
zi = [{} for t in range(len(obs) - 1)] # this is needed to keep track of finding a state i at a time t and j at a time (t+1) for all i and all j and all t
# get alpha and beta tables computes
p_obs = self.forward(obs)
self.backward(obs)
# compute gamma values
for t in range(len(obs)):
for y in self.states:
gamma[t][y] = (self.fwd[t][y] * self.bwk[t][y]) / p_obs
if t == 0:
self.pi[y] = gamma[t][y]
#compute zi values up to T - 1
if t == len(obs) - 1:
continue
zi[t][y] = {}
for y1 in self.states:
zi[t][y][y1] = self.fwd[t][y] * self.A[y][y1] * self.B[y1][obs[t + 1]] * self.bwk[t + 1][y1] / p_obs
# now that we have gamma and zi let us re-estimate
for y in self.states:
for y1 in self.states:
# we will now compute new a_ij
val = sum([zi[t][y][y1] for t in range(len(obs) - 1)]) #
val /= sum([gamma[t][y] for t in range(len(obs) - 1)])
self.A[y][y1] = val
# re estimate gamma
for y in self.states:
for k in self.symbols: # for all symbols vk
val = 0.0
for t in range(len(obs)):
if obs[t] == k :
val += gamma[t][y]
val /= sum([gamma[t][y] for t in range(len(obs))])
self.B[y][k] = val
return self.A, self.B, self.pi
model = {
"A": {
"Coin 1" : {"Coin 1": 0.6, "Coin 2": 0.3, "Coin 3": 0.1},
"Coin 2" : {"Coin 1": 0.2, "Coin 2": 0.5, "Coin 3": 0.3},
"Coin 3" : {"Coin 1": 0.3, "Coin 2": 0.2, "Coin 3": 0.5}
},
"B": {
"Coin 1" : {"Heads": 0.7, "Tails": 0.3},
"Coin 2" : {"Heads": 0.3, "Tails": 0.7},
"Coin 3" : {"Heads": 0.5, "Tails": 0.5}
},
"pi": {"Coin 1": 0.4, "Coin 2": 0.3, "Coin 3": 0.3}
}
observations = ("Heads", "Tails", "Heads", "Heads", "Heads", "Tails", "Tails")
hmm = MyHmm(model)
hmm.viterbi(observations)
hmm.forward_backward(observations)
notebook_file = r"TS3_Forecasting_with_HMM.ipynb"
html_converter(notebook_file)